package com.tang.common.config;

import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.otter.canal.client.*;
import com.alibaba.otter.canal.protocol.Message;
import com.tang.common.event.CanalCacheEvent;
import com.tang.common.properties.CanalProperties;
import com.tang.module.canal.handle.CanalCoreDispenser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

/**
 * canal启动器
 * @author tang
 * @date 2021/9/10 14:23
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "canal",name = "enable",havingValue = "true")
@EnableConfigurationProperties(CanalProperties.class)
public class CanalConfig implements ApplicationRunner {

	private final static  Integer BATCH_SIZE = 1000;

	/**
	 * canal已经重试连接的次数，每次重试间隔1分钟
	 */
	private static Integer currentRetry = 0;

	/**
	 * canal重试的时间间隔，默认1分钟
	 */
	private final static Integer INTERVAL_MINUTES = 1;

	/**
	 * canal最大的重试次数，超过该次数即停止连接canal
	 */
	private final static Integer TOTAL_NUMBER_RETRIES = 10;


	@Autowired
	private CanalProperties canalProperties;

	@Resource
	private ApplicationEventPublisher applicationEventPublisher;


	@Override
	public void run(ApplicationArguments args) throws Exception {
		//在hutool公共线程池中运行canal,队列长度最大1024
		ThreadUtil.execute(this::run);
	}

	public void run() {
		// 创建链接
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalProperties.getIp(), canalProperties.getPort()), canalProperties.getExample(), "", "");
		try {
			//打开连接
			connector.connect();
			System.out.println("***********************************启动canal >>>>>" + canalProperties.getIp() + ":" + canalProperties.getPort() + "******************************************");
			//订阅数据库表,全部表
			connector.subscribe(canalProperties.getFilter());
			//回滚到未进行ack的地方，下次fetch的时候，可以从最后一个没有ack的地方开始拿
			connector.rollback();
			while (true) {
				// 获取指定数量的数据
				Message message = connector.getWithoutAck(BATCH_SIZE);
				//获取批量ID
				long batchId = message.getId();
				//获取批量的数量
				int size = message.getEntries().size();
				//如果没有数据
				if (batchId == -1 || size == 0) {
					try {
						//线程休眠2秒
						TimeUnit.SECONDS.sleep(2);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				} else {
					//发布canal缓存更新事件
					applicationEventPublisher.publishEvent(new CanalCacheEvent(message.getEntries()));
				}
				//进行 batch id 的确认。确认之后，小于等于此 batchId 的 Message 都会被确认。
				connector.ack(batchId);
			}
		} catch (Exception e) {
			log.error("canal连接异常: {},已经重试{}次",e.getMessage(),currentRetry);
			//默认延迟1分钟再次进行重试
			try {
				TimeUnit.MINUTES.sleep(INTERVAL_MINUTES);
			} catch (InterruptedException interruptedException) {
				interruptedException.printStackTrace();
			}
			//重新连接
			if (currentRetry <= TOTAL_NUMBER_RETRIES){
				currentRetry ++;
				this.run();
			}

		} finally {
			connector.disconnect();
		}
	}
}
